Snowflakeで定期的にCOPYコマンドを実行するタスクを作ってみた
こんにちは!エノカワです。
Snowflakeには、SQLステートメントを実行するタスクという機能があります。
スケジュールを定義して、テーブルへのデータ投入などの定期的な作業を実行することができます。
また、タスクAの実行後にタスクBを実行するといった依存関係を定義することもできます。
今回は、定期的にCOPY
コマンドを実行するタスクを作ってみました。
前提条件
Citi Bikeというシェアサイクリングサービスの利用者に関するデータを使用します。
以下の記事を参考にロードするデータの準備を行います。
ロードするデータの準備が完了すると、以下が作成された状態になります。
- データベース:CITIBIKE
- テーブル:TRIPS
- ステージ:CITIBIKE_TRIPS ※今回使用しません
- ファイル形式:CSV
タスクの概要
以下のようなイメージで動くタスクを作成します。
- タスクA
- 毎時0分に起動する
COPY
コマンドでTRIPS
テーブルのデータをステージのファイルにアンロードする- アンロードするファイルは年ごとに分割する
- タスクB
- タスクAの実行完了後に起動する
COPY
コマンドでステージにあるファイルのデータをテーブルにロードする- ロード先のテーブルは年ごとに分かれており、一致する年のファイルをロードする
使用するテーブル
TRIPS
テーブルを使用します。
テーブルの構造は下記のようになっています。
STARTTIME
とSTOPTIME
は、シェアサイクリングサービスの利用開始時刻と終了時刻です。
アンロードするデータ
タスクAでは、下記の条件でアンロードするデータを絞り込みます。
- タスクAが起動した時刻の時間(HOUR)と
STOPTIME
の時間(HOUR)が一致するデータ- 例)15:00に起動した場合、
STOPTIME
が15時台のデータのみアンロードする
- 例)15:00に起動した場合、
ロードするデータ
タスクBでは、下記の3つの項目をロードします。
- ロードしたファイルの名前
STOPTIME
(シェアサイクリングサービスの終了時刻)- タスクBの実行時刻
準備
実際にタスクを作成る前に、準備を行います。
ワークシート上で下記SQLを実行します。
コンテキスト設定
USE
コマンドで今回の検証で使用するコンテキストを設定します。
USE ROLE SYSADMIN; USE WAREHOUSE COMPUTE_WH; USE CITIBIKE; USE SCHEMA PUBLIC;
ロード先のテーブル作成
CRATE TABLE
コマンドで2013年、2014年、2015年の3つのテーブルを作成します。
create or replace table TRIPS_2013_TABLE ( filename varchar, stoptime timestamp, load_datetime timestamp ); create or replace table TRIPS_2014_TABLE like TRIPS_2013_TABLE; create or replace table TRIPS_2015_TABLE like TRIPS_2013_TABLE;
アンロード先のステージ作成
CREATE STAGE
コマンドでアンロード先のステージを作成します。
今回は名前付きステージを使用します。
create or replace stage TRIPS_UNLOAD_STAGE;
タイムゾーン設定
検証を進めていく途中でタイムスタンプを確認しますが、
タイムゾーンがデフォルトのAmerica/Los_Angeles
(太平洋標準時)のままだと分かりにくいので、
ALTER
コマンドでタイムゾーンをAsia/Tokyo
に変更しておきます。
alter session set TIMEZONE = 'Asia/Tokyo';
準備が整いました。
それでは、実際にタスクを作成していきましょう。
タスク作成
タスクは、CREATE TASK
コマンドで作成することができます。
タスクA:ステージにアンロードするタスク
create or replace task TRIPS_UNLOAD_TASK WAREHOUSE = COMPUTE_WH SCHEDULE = 'USING CRON 0 */1 * * * Asia/Tokyo' TIMEZONE = 'Asia/Tokyo' COMMENT = ' TRIPSテーブルからデータをアンロード' AS copy into @TRIPS_UNLOAD_STAGE from (select * from TRIPS where date_part(year, stoptime) in (2013, 2014, 2015) and date_part(hour, stoptime) = date_part(hour, current_timestamp)) partition by (date_part(year, stoptime) || '/' || date_part(hour, stoptime)) file_format = (format_name = CSV compression='GZIP') ;
2〜5行目では、タスクのパラメータを指定しています。
WAREHOUSE
:タスクの実行に使用される仮想ウェアハウスを指定します。SCHEDULE
:タスクを定期的に実行するスケジュールを指定します。- cron式とタイムゾーンを指定します。今回は、毎時0分で指定します。
TIMEZONE
:セッションのタイムゾーンを指定します。- クエリで
current_timestamp
を使用しているので、Asia/Tokyo
を指定しておきます。
- クエリで
COMMENT
:タスクのコメントを指定します。
7〜12行目では、タスクで実行されるSQLステートメントを指定しています。
COPY
コマンドでTRIPS_UNLOAD_STAGE
ステージにアンロードするクエリです。
STOPTIME
でアンロードするデータを絞り込んでいます。- 年(YEAR):2013年 or 2014年 or 2015年
- 時間(HOUR):現在時刻(タスク実行時刻)の時間
PARTITON BY
を指定してアンロードするファイルを分割しています。STOPTIME
をYYYY/HH
形式に変換した値で分割
PARTITON BY
については、以下の記事でも紹介しています。
タスクB:テーブルにロードするタスク
2013年、2014年、2015年でテーブルが分かれているので、別々にタスクを作成します。
2013年
create or replace task TRIPS_LOAD_2013_TASK WAREHOUSE = COMPUTE_WH TIMEZONE = 'Asia/Tokyo' COMMENT = '2013年のファイルをロード' after TRIPS_UNLOAD_TASK AS copy into TRIPS_2013_TABLE from(select metadata$filename, $3, current_timestamp from @TRIPS_UNLOAD_STAGE/2013 (file_format => CSV)) ;
2〜5行目では、タスクのパラメータを指定しています。
after
:現在のタスクの先行タスクを指定します。- タスクAの後に実行させたいので、
TRIPS_UNLOAD_TASK
を指定します。
- タスクAの後に実行させたいので、
なお、after
を指定した場合は、SCHEDULE
を指定することはできません。
7〜10行目では、タスクで実行されるSQLステートメントを指定しています。
COPY
コマンドでTRIPS_2013_TABLE
テーブルにロードするクエリです。
metadata$filename
は、ロードするファイルの名前です。$3
は、ロードするファイルの3番目のフィールドでSTOPTIME
です。current_timestamp
は、現在のタイムスタンプを返す関数(=タスクの実行時刻)です。
ステージにあるファイルのクエリについては、以下の記事でも紹介しています。
2014年、2015年も同様に作成します。
2013年と異なるのは、COMMENT
の内容(4行目)とロードするファイルパス(9行目)の2箇所です。
2014年
create or replace task TRIPS_LOAD_2014_TASK WAREHOUSE = COMPUTE_WH TIMEZONE = 'Asia/Tokyo' COMMENT = '2014年のファイルをロード' after TRIPS_UNLOAD_TASK AS copy into TRIPS_2014_TABLE from(select metadata$filename, $3, current_timestamp from @TRIPS_UNLOAD_STAGE/2014 (file_format => CSV)) ;
2015年
create or replace task TRIPS_LOAD_2015_TASK WAREHOUSE = COMPUTE_WH TIMEZONE = 'Asia/Tokyo' COMMENT = '2015年のファイルをロード' after TRIPS_UNLOAD_TASK AS copy into TRIPS_2015_TABLE from(select metadata$filename, $3, current_timestamp from @TRIPS_UNLOAD_STAGE/2013 (file_format => CSV)) ;
タスク起動
作成したタスクを確認してみましょう。
SHOW TASKS
コマンドでタスクをリストを取得できます。
state
は、タスクの現在の状態です。
started
:開始suspend
:一時停止
この段階では、タスクは一時停止状態になっているので開始してあげる必要があります。
タスクを起動するには「グローバル権限」のEXECUTE TASK
が必要となるので、
ACCOUNTADMIN
でSYSADMIN
に権限を付与します。
USE ROLE ACCOUNTADMIN; GRANT EXECUTE TASK ON ACCOUNT TO ROLE SYSADMIN; USE ROLE sysadmin;
ALTER TASK
コマンドでタスクをプロパティを変更することができます。
resume
を指定して開始済み状態に変更します。
依存関係を持つタスクの場合、子タスク(タスクB)の方から先に開始済みにする必要があります。
alter task TRIPS_LOAD_2013_TASK resume; alter task TRIPS_LOAD_2014_TASK resume; alter task TRIPS_LOAD_2015_TASK resume; alter task TRIPS_UNLOAD_TASK resume;
SHOW TASKS
コマンドでタスクの状態を確認してみましょう。
タスクが開始状態になりましたね。
この状態で3時間ほど放置します。
タスク履歴確認
3時間経過しました。
タスクが実行されているか確認してみましょう。
TASK_HISTORY
テーブル関数でタスクの使用履歴を取得することができます。
select * from table(information_schema.task_history());
タスクの使用履歴が取得できました。
13時(17行目)から1時間ごとに4つのタスクが実行されていますね。
4つのタスクの中身を見てみると、
タスクA(赤枠囲み)のあとに3つのタスクB(青枠囲み)が実行されていることが分かります。
1行目のタスク(緑枠囲み)は、17時開始でスケジューリングされたタスクAです。
クエリ実行前なのでQUERY ID
がNULLになっています。
なお、今回は実行時刻を確認したいので表示する列を絞り込んでいます。
テーブル確認
ロードしたテーブルの中身も確認しておきましょう。
下記SQLを実行して、
テーブルごとにロードされたファイルの名前、STOPTIME
、タスク実行時刻をクエリします。
2013年
select filename, max(stoptime), max(load_datetime) from TRIPS_2013_TABLE group by 1 order by 1;
2014年
select filename, max(stoptime), max(load_datetime) from TRIPS_2014_TABLE group by 1 order by 1;
2015年
select filename, max(stoptime), max(load_datetime) from TRIPS_2015_TABLE group by 1 order by 1;
各テーブルに該当する年のファイルがロードされています。
タスクの実行時刻も1時間ごとになっていますね。
タスク停止
タスクはしばらく使用しないので、停止しておきましょう。
ALTER TASK
コマンドでsuspend
を指定して停止状態に変更します。
起動時とは逆に、親タスク(タスクA)の方から先に停止する必要があります。
alter task TRIPS_UNLOAD_TASK suspend; alter task TRIPS_LOAD_2013_TASK suspend; alter task TRIPS_LOAD_2014_TASK suspend; alter task TRIPS_LOAD_2015_TASK suspend;
まとめ
以上、定期的にCOPY
コマンドを実行するタスクを作ってみました。
繰り返し実行するSQLステートメントにスケジュールなどのパラメータを指定するだけで
定期的に実行するタスクを作成することができました。
今回は、SQLステートメントを実行するタスクを作成しましたが、
ストアドプロシージャを実行するタスクを作成することも可能です。